Spring Cloud Alibaba 微服务系列 - Seata

Seata 的简介

Seata 是阿里巴巴开源的一款高性能、易使用的分布式事务解决方案。它的核心价值在于,在微服务架构下,以极低的性能损耗和业务无感(侵入性极低)的方式,解决了跨数据库、跨服务的本地事务一致性问题,也就是实现了分布式系统下的 ACID 保证。另外,关于事务和分布式事务的理论知识可以参考之前的文章:《Spring 的本地事务》《分布式事务》。Seata 官网 文档,请参考 Seata 快速开始


三驾马车

在 Seata 的分布式事务世界里,TC(事务协调器)TM(事务管理器)RM(资源管理器) 是并肩作战的“三驾马车”。

  • TC (Transaction Coordinator) - 事务协调器。维护全局事务的状态,负责协调并驱动全局事务的提交(Commit)或回滚(Rollback)。在生产中,它是一个独立的 Seata-Server 集群。
  • TM (Transaction Manager) - 事务管理器。定义全局事务的边界。通常我们在业务的 Controller 或者 Service 方法上加一个 @GlobalTransactional 注解,这个微服务就化身成了 TM。它负责向 TC 申请开启、提交或回滚全局事务。
  • RM (Resource Manager) - 资源管理器。管辖分支事务的资源(也就是各个微服务本地的数据库连接)。它向 TC 注册分支事务,并汇报分支事务的执行状态,同时驱动本地事务的提交或回滚。

要彻底看懂它们,不能孤立地背诵定义,而是要将它们放入一个完整的分布式事务生命周期(从诞生到消亡)中。在聊它们的交互生命周期之前,我们先明确这三个核心组件在物理上的存活状态:

当一个用户点击 “立即下单”时,一条贯穿 TM、RM、TC 的分布式事务生命周期线被正式拉开。我们用 5 个核心节点来还原它的演变现场:

  • 孕育阶段:全局事务的诞生(TM $\rightarrow$ TC)
  • 现场推演:请求打到了订单微服务(TM)。方法上挂着 @GlobalTransactional。
    • 生命周期演变:
      • TM 发起呼叫:TM 拦截到请求,通过 Netty 网络长连接向远程的 TC 发出一条指令:“我要开创一个新时代,请给我颁发一个全局事务签证!”
      • TC 录入大盘:TC 收到请求,在自己的内存或数据库(global_table)里创建一条记录,并生成全网唯一的分布式事务 ID —— XID
      • TM 承接 XID:TC 把 XID 返回给 TM。此时,TM 将 XID 绑定到当前线程的 RootContext 中。
    • 当前状态:全局事务处于 Begin(开始)状态。
  • 蔓延阶段:XID 的跨服务传递(TM $\rightarrow$ RM)
  • 现场推演:订单服务(TM)开始通过 Feign 远程调用库存服务和账户服务(RM)。
    • 生命周期演变:
      • Spring Cloud 拦截器出手:Seata 的 Feign/RestTemplate 拦截器会自动把当前线程里的 XID 揪出来,塞进 HTTP 请求头(Header)的 TX_XID 中,顺着网络线游过去。
      • RM 接收绑定:库存服务(RM)收到请求,其内部的 Web 拦截器把 Header 里的 XID 搂出来,同样绑到自己本地线程的 RootContext 中。
  • 肉搏阶段:分支事务的注册与一阶段本地落盘(RM $\rightleftharpoons$ TC)
  • 现场推演:库存服务(RM)开始执行一阶段 Try 的业务(比如你写的 prepareDecrease 方法)。
    • 生命周期演变:
      • RM 向 TC 报备(分支注册):在真正改动本地资产前,RM 必须先向 TC 汇报:“大哥,我被 XID = xxx 的全局事务拉下水了,我要注册一个分支事务,我的资源名叫 storageTccAction”。
      • TC 记录分支:TC 收到,在自己的 branch_table 里记下一笔,并给 RM 颁发一个分支 BranchId。
      • RM 局部完结:RM 拿到 BranchId,在本地数据库写好你的 TccTransactionLog 控制表(状态为 1-TRY),执行冻结资产,然后高高兴兴地释放了本地的数据库物理锁。
    • 当前状态:一阶段(Phase One)顺利收官。
  • 审判阶段:由 TM 发起全局决议(TM $\rightarrow$ TC)
  • 现场推演:所有的远程微服务都调用完了,链路重新回到了订单服务(TM)的 @GlobalTransactional 方法出口。

  • 生命周期演变:

    • TM 提交/回滚申请:如果刚才整条链路没有报错,TM 就会向 TC 发送:“我这边一切顺利,申请全局 Commit”。如果中途抛了异常,TM 的切面会捕获并向 TC 发送:“有人翻车了,申请全局 Rollback”。
  • TM 的使命终结:发送完决议申请后,TM 将自己本地线程的 XID 清除。对于 TM 而言,它的分布式事务协调生命周期到此结束。

  • 清算阶段:TC 异步追杀与 RM 终结销账(TC $\rightarrow$ RM)
    • 现场推演:这是我们之前在日志里抓到现行的 “疯狂重试” 阶段。
    • 生命周期演变:
      • TC 广播号角:TC 收到 TM 的决议(比如 Commit),立刻转过身去,翻开自己的 branch_table,通过 Netty 长连接向所有参与过这个 XID 的 RM 节点下发异步广播(BranchCommitRequest)。
      • RM 听令二阶段落地:
        • 账户和库存的 RM 收到 TC 的大喇叭喊话,立刻唤醒本地的 confirm(或 cancel)方法。
        • RM 执行我们写好的 CAS 状态占坑(由 1 改为 2),划转真实资产,然后向 TC 回传回复:“报告大哥,我二阶段利落落地了(Success)”。
    • TC 结案销账(生命周期的终点):TC 收到所有 RM 的成功 Ack 之后,满意地把这条 XID 从自己的 global_table 和 branch_table 里彻底 DELETE 销账。

我们可以用一句话把 TC、TM、RM 的生命周期交织逻辑串联起来:TM 是始作俑者,它决定了全局事务的诞生与走向(提交或回滚); RM 是前线苦力,它在一阶段随用随锁、随完随释放,并在二阶段听从召唤进行数据清算; TC 是不灭的中央最高法院,它在后台俯瞰全局,用自身的数据持久化和长连接重试机制,拉长战线死磕 RM,直到逼迫所有 RM 达成最终一致性后,才亲手为该全局事务 “盖章销户”。这也就是为什么当你二阶段参数缺失返回 false 时,TC 的生命周期卡在 “清算阶段” 无法消亡,从而在后台疯狂重试你 RM 的根本原因!


四大模式

Seata 提供了四种武器,横向对比如下:

AT 模式是怎么回滚的?(undo_log 的秘密)

  • 在第一阶段,RM 拦截到业务 SQL(例如 update stock set count = 90 where id = 1)。
  • Seata 会在本地事务提交前,自动查询修改前的数据镜像(Before Image:count=100)和修改后的数据镜像(After Image:count=90),并将这两份镜像做成反向补偿 SQL,塞进本地数据库的 undo_log 表中。
  • 两个阶段如果成功,异步删除 undo_log;如果全局失败,RM 读取 undo_log 的 Before Image,将数据人肉还原回去。

AT模式又是怎么防止脏写的?(全局锁 Global Lock)。如果本地事务在一阶段提交了,此时另外一个没有加 Seata 的普通线程跑过来把这行数据改了,Seata 怎么防止回滚时发生覆盖脏写?

实际上,Seata 还引入了 TC 侧的 “全局锁(Global Lock)”。在一阶段本地事务提交前,RM 必须先拿到 TC 的全局锁。如果拿不到,说明有其他事务在操作,本地事务会不断重试。回滚时,也会校验 After Image 是否与当前数据库一致,如果不一致说明发生了脏写,会立刻触发报警,从而完美解决了读写隔离问题。

为什么在项目里选 Seata AT 模式?

核心是评估开发成本与性能之间的天平。AT 模式虽然在高并发下因为全局锁存在一定的性能妥协,但它提供了对业务代码零侵入的巨大优势,能让团队快速交付;而对于个别高并发的热点扣减链路,后续可以局部重构为 TCC 模式,实施精细化的性能压榨。


AT模式的测试案例

先对这个测试项目做一下简单介绍。这个简单的「分布式下单」案例基于 Spring Boot 3.x、MyBatis-Plus、OpenFeign、Nacos 注册中心以及 Seata 2.x(AT 模式) 构建。整个链路的物理拓扑架构如下:

  • 链路 zdemo-seata-order (下单/开启全局事务) $\rightarrow$ 远程调用 $\rightarrow$ zdemo-seata-store (扣减库存) & zdemo-seata-user (扣减余额)。
  • 另外 zdemo-seata-api 是公共模块,用来存放接口和公共类。
  • 项目的父项目请参考 《Spring Cloud Alibaba 基础案例》


api 公共模块

依赖配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zdemo.scloud</groupId>
<artifactId>zdemo-scloud-parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>zdemo-seata-api</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>

公共接口(OpenFeign 实现):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.zdemo.scloud.api.service;
// ...

@FeignClient(name = "zdemo-seata-store", contextId = "storeClient")
public interface StoreFeignClient {
@PostMapping("/storage/decrease")
Result<Void> decrease(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count);
}

@FeignClient(name = "zdemo-seata-user", contextId = "userClient")
public interface UserFeignClient {
@PostMapping("/account/decrease")
Result<Void> decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}

公共类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Data
public class OrderDTO {
private Long userId;
private String commodityCode;
private Integer count;
private BigDecimal money;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Result<T> {
private Integer code;
private String msg;
private T data;

public static <T> Result<T> success(T data) {
return new Result<>(200, "success", data);
}

public static <T> Result<T> error(String msg) {
return new Result<>(500, msg, null);
}
}


order 下单模块

依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<dependencies>
<dependency>
<groupId>com.zdemo.scloud</groupId>
<artifactId>zdemo-seata-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
</dependency>

<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>


配置文件

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
server:
port: 8103
spring:
application:
name: zdemo-seata-order
datasource:
url: jdbc:mysql://192.168.1.251:3306/zdemo_seata_order?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&allowMultiQueries=true&rewriteBatchedStatements=true&connectTimeout=3000&socketTimeout=60000
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
# 显式指定并调优 Hikari 数据库连接池
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimum-idle: 20
maximum-pool-size: 50
idle-timeout: 600000
connection-timeout: 30000
max-lifetime: 1800000
connection-test-query: SELECT 1
cloud:
nacos:
discovery:
server-addr: 192.168.1.149:8848
username: nacos
password: nacos
namespace: prod-zdemo
cluster-name: DEFAULT

# MyBatis-Plus 配置
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

# 🌟 Seata 2.0 原生核心配置
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: zdemo_tx_group # 对应你 Seata-server 配置的事务组
service:
vgroup-mapping:
zdemo_tx_group: default # 映射到指定的 TC 集群
#grouplist:
# default: 192.168.1.149:8091 # 你的 Seata TC 服务端暴露的 netty 物理端口
# 实际的生产环境不再人肉硬编码写死 seata 的服务地址 192.168.1.149:8091,而是让 Client 客户端自动去 Nacos 里捞取名为 'seata-server' 的健康实例
registry:
type: nacos
nacos:
server-addr: 192.168.1.149:8848
namespace: prod-zdemo
group: SEATA_GROUP
username: nacos
password: nacos
application: seata-server
data-source-proxy-mode: AT # 开启标准的 AT 模式自动代理


启动类

SeataOrderApplication

1
2
3
4
5
6
7
8
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients(basePackages = "com.zdemo.scloud.api.service") // 强行扫描外包契约
public class SeataOrderApplication {
public static void main(String[] args) {
SpringApplication.run(SeataOrderApplication.class, args);
}
}


业务代码

OrderController

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
@RequestMapping("/order")
public class OrderController {
@Resource
private OrderService orderService;

@PostMapping("/create")
public Result<Void> create(@RequestBody OrderDTO orderDto) {
orderService.createOrder(orderDto);
return Result.success(null);
}
}

OrderService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import com.zdemo.scloud.api.model.dto.OrderDTO;
import com.zdemo.scloud.api.service.StoreFeignClient;
import com.zdemo.scloud.api.service.UserFeignClient;
import com.zdemo.scloud.order.entity.Order;
import com.zdemo.scloud.order.mapper.OrderMapper;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class OrderService {

@Resource
private OrderMapper orderMapper;
@Resource
private StoreFeignClient storeFeignClient;
@Resource
private UserFeignClient userFeignClient;

/**
* 核心注解:@GlobalTransactional - 角色 TM 事务管理器,它的核心作用是开启和终结全局事务。
* 当请求走到 order 服务的这个注解时,Seata 的切面会人肉向 TC(协调器)发起一个网络请求:“老大,我要开辟一个新的全局事务,请赐予我一个全局唯一的 XID!”。接着,它负责在整个 Feign 远程调用链中人肉广播这个 XID。它是事务的起点和终结者。
*
* TM 侧(大总管):只有整个分布式业务链路的发起方入口(比如 OrderService.createOrder() 方法),才有资格冠以 @GlobalTransactional,用来开启和统领全局。
* RM 侧(地方官):下游被 Feign 调用的所有微服务提供方(如 store、user 的本地 Service),一律清爽地使用 Spring 自带的 @Transactional
*/
@GlobalTransactional(name = "zdemo-create-order-tx", rollbackFor = Exception.class)
public void createOrder(OrderDTO orderDto) {
log.info("==========================================================================");
log.info("🏁 触发分布式下单大闸!Seata 全局唯一事务 XID = {}", RootContext.getXID());

// 1. 本地第一阶段落盘:生成初始订单(status=0)
Order order = new Order();
order.setUserId(orderDto.getUserId());
order.setCommodityCode(orderDto.getCommodityCode());
order.setCount(orderDto.getCount());
order.setMoney(orderDto.getMoney());
order.setStatus(0);
orderMapper.insert(order);
log.info("Step 1 ➔ 本地一阶段订单数据初步插入成功!");

// 2. 远程 RPC 呼叫仓储服务
log.info("Step 2 ➔ 正在通过 OpenFeign 调用【仓储微服务】扣减库存...");
storeFeignClient.decrease(orderDto.getCommodityCode(), orderDto.getCount());

// 3. 远程 RPC 呼叫用户账户服务
log.info("Step 3 ➔ 正在通过 OpenFeign 调用【账户微服务】扣减余额...");
userFeignClient.decrease(orderDto.getUserId(), orderDto.getMoney());

// 4. 全链路正常无错,更新本地订单状态为 1 (完结)
order.setStatus(1);
orderMapper.updateById(order);
log.info("🎉 Step 4 ➔ 全链路业务校验通关,本地订单置为完结。全局事务准备自动二阶段 Commit!");
log.info("==========================================================================");
}
}

OrderMapper

1
2
3
4
5
6
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zdemo.scloud.order.entity.Order;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface OrderMapper extends BaseMapper<Order> {}

Order

1
2
3
4
5
6
7
8
9
10
11
@Data
@TableName("t_order")
public class Order {
@TableId(type = IdType.AUTO)
private Long id;
private Long userId;
private String commodityCode;
private Integer count;
private BigDecimal money;
private Integer status; // 0-创建中,1-已完结
}


store 库存模块

依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<dependencies>
<dependency>
<groupId>com.zdemo.scloud</groupId>
<artifactId>zdemo-seata-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>


配置文件

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
server:
port: 8101
spring:
application:
name: zdemo-seata-store
datasource:
url: jdbc:mysql://192.168.1.251:3306/zdemo_seata_store?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&allowMultiQueries=true&rewriteBatchedStatements=true&connectTimeout=3000&socketTimeout=60000
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
# 显式指定并调优 Hikari 数据库连接池
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimum-idle: 20
maximum-pool-size: 50
idle-timeout: 600000
connection-timeout: 30000
max-lifetime: 1800000
connection-test-query: SELECT 1
cloud:
nacos:
discovery:
server-addr: 192.168.1.149:8848
username: nacos
password: nacos
namespace: prod-zdemo
cluster-name: DEFAULT

# MyBatis-Plus 配置
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

# 🌟 Seata 2.0 原生核心配置
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: zdemo_tx_group # 对应你 Seata-server 配置的事务组
service:
vgroup-mapping:
zdemo_tx_group: default # 映射到指定的 TC 集群
registry:
type: nacos
nacos:
server-addr: 192.168.1.149:8848
namespace: prod-zdemo
group: SEATA_GROUP
username: nacos
password: nacos
application: seata-server
data-source-proxy-mode: AT # 开启标准的 AT 模式自动代理


启动类

1
2
3
4
5
6
7
@SpringBootApplication
@EnableDiscoveryClient
public class SeataStoreApplication {
public static void main(String[] args) {
SpringApplication.run(SeataStoreApplication.class, args);
}
}


业务类

StorageController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@RestController
@RequestMapping("/storage")
public class StorageController {
@Resource
private StorageService storageService;

@PostMapping("/decrease")
public Result<Void> decrease(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) {
log.info("▶▶▶ [仓储服务] 收到扣减库存请求. XID: {}", RootContext.getXID()); // 通过 RootContext 抓取 XID,能看到说明 Seata 事务上下文成功通过 Feign 偷渡过来了!
storageService.decrease(commodityCode, count);
return Result.success(null);
}
}

StorageService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
public class StorageService {

@Resource
private StorageMapper storageMapper;

@Transactional(rollbackFor = Exception.class)
public void decrease(String commodityCode, Integer count) {
UpdateWrapper<Storage> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("commodity_code", commodityCode)
.setSql("count = count - " + count);
storageMapper.update(null, updateWrapper);
}
}

StorageMapper

1
2
3
4
5
6
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zdemo.scloud.store.entity.Storage;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface StorageMapper extends BaseMapper<Storage> {}

Storage

1
2
3
4
5
6
7
8
@Data
@TableName("t_storage")
public class Storage {
@TableId
private Long id;
private String commodityCode;
private Integer count;
}


user 账户模块

依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<dependencies>
<dependency>
<groupId>com.zdemo.scloud</groupId>
<artifactId>zdemo-seata-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>


配置文件

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
server:
port: 8102
spring:
application:
name: zdemo-seata-user
datasource:
url: jdbc:mysql://192.168.1.251:3306/zdemo_seata_user?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&allowMultiQueries=true&rewriteBatchedStatements=true&connectTimeout=3000&socketTimeout=60000
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
# 显式指定并调优 Hikari 数据库连接池
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimum-idle: 20
maximum-pool-size: 50
idle-timeout: 600000
connection-timeout: 30000
max-lifetime: 1800000
connection-test-query: SELECT 1
cloud:
nacos:
discovery:
server-addr: 192.168.1.149:8848
username: nacos
password: nacos
namespace: prod-zdemo
cluster-name: DEFAULT

# MyBatis-Plus 配置
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

# 🌟 Seata 2.0 原生核心配置
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: zdemo_tx_group # 对应你 Seata-server 配置的事务组
service:
vgroup-mapping:
zdemo_tx_group: default # 映射到指定的 TC 集群
registry:
type: nacos
nacos:
server-addr: 192.168.1.149:8848
namespace: prod-zdemo
group: SEATA_GROUP
username: nacos
password: nacos
application: seata-server
data-source-proxy-mode: AT # 开启标准的 AT 模式自动代理


启动类

1
2
3
4
5
6
7
@SpringBootApplication
@EnableDiscoveryClient
public class SeataUserApplication {
public static void main(String[] args) {
SpringApplication.run(SeataUserApplication.class, args);
}
}


业务类

AccountController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@RestController
@RequestMapping("/account")
public class AccountController {

@Resource
private AccountService accountService;

@PostMapping("/decrease")
public Result<Void> decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) {
log.info("▶▶▶ [账户服务] 收到扣减余额请求. XID: {}", RootContext.getXID());
accountService.decrease(userId, money);
return Result.success(null);
}
}

AccountService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
@Service
public class AccountService {

@Resource
private AccountMapper accountMapper;

/**
* 核心注解:@Transactional 这里充当的是 RM 资源管理器角色。它是 Spring 自带的本地事务注解,负责管辖当前微服务内、当前数据库连接的 ACID 特性。
* 当 store 服务被 Feign 呼叫时,Spring 的事务管理器会人肉执行 connection.setAutoCommit(false),开启本地数据库事务。
*/
@Transactional
public void decrease(Long userId, BigDecimal money) {
// 分布式事务测试桩:金额大到超过 500 元时,本地扣完后故意抛出运行时异常,逼迫全局事务回滚!
if (money.compareTo(new BigDecimal("500")) > 0) {
log.error("[埋点激活] 触发预设余额不足惩罚,强行抛出运行时异常引发全局回滚!");
throw new RuntimeException("【账户中心】账户余额严重不足,扣款失败!");
}

UpdateWrapper<Account> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("user_id", userId)
.setSql("money = money - " + money);
accountMapper.update(null, updateWrapper);
}
}

AccountMapper

1
2
3
4
5
6
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zdemo.scloud.user.entity.Account;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface AccountMapper extends BaseMapper<Account> {}

Account

1
2
3
4
5
6
7
8
@Data
@TableName("t_account")
public class Account {
@TableId
private Long id;
private Long userId;
private BigDecimal money;
}


业务相关的库表

订单数据库 zdemo_seata_order:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CREATE DATABASE IF NOT EXISTS `zdemo_seata_order` DEFAULT CHARACTER SET utf8mb4;
USE `zdemo_seata_order`;

-- 订单业务表
CREATE TABLE `t_order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`user_id` bigint(20) DEFAULT NULL COMMENT '用户ID',
`commodity_code` varchar(255) DEFAULT NULL COMMENT '商品编码',
`count` int(11) DEFAULT '0' COMMENT '购买数量',
`money` decimal(11,2) DEFAULT '0.00' COMMENT '订单金额',
`status` int(11) DEFAULT '0' COMMENT '订单状态:0-创建中,1-已完结',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- Seata AT 模式核心回滚日志表(每个业务库都必须有!)
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

商品仓储数据库 zdemo_seata_store:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
CREATE DATABASE IF NOT EXISTS `zdemo_seata_store` DEFAULT CHARACTER SET utf8mb4;
USE `zdemo_seata_store`;

-- 商品库存表
CREATE TABLE `t_storage` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL COMMENT '商品编码',
`count` int(11) DEFAULT '0' COMMENT '总库存',
PRIMARY KEY (`id`),
UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 初始化数据:初始化商品编码为 "Owlias-1.3" 的教程,库存 100 件
INSERT INTO `t_storage` (`id`, `commodity_code`, `count`) VALUES (1, 'Owlias-1.3', 100);

-- 注入 undo_log 表
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

用户账户数据库 zdemo_seata_user:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CREATE DATABASE IF NOT EXISTS `zdemo_seata_user` DEFAULT CHARACTER SET utf8mb4;
USE `zdemo_seata_user`;

-- 用户账户余额表
CREATE TABLE `t_account` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`user_id` bigint(20) DEFAULT NULL COMMENT '用户ID',
`money` decimal(11,2) DEFAULT '0.00' COMMENT '账户余额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 初始化数据:给用户 1 初始化 1000 元钱
INSERT INTO `t_account` (`id`, `user_id`, `money`) VALUES (1, 1, 1000.00);

-- 注入 undo_log 表
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

关于如何查看 rollback_info 字段的说明:

1
2
3
4
-- 查看 order 库的 undo_log.rollback_info
select CONVERT(rollback_info USING utf8) from zdemo_seata_order.undo_log where id=xxx;
-- 查看 store 库的 undo_log.rollback_info
select CONVERT(rollback_info USING utf8) from zdemo_seata_store.undo_log where id=xxx;

结果形如:

1
2
3
4
// order insert
{"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"192.168.1.149:8091:5450146059598175958","branchId":5450146059598175973,"sqlUndoLogs":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"INSERT","tableName":"t_order","beforeImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords","tableName":"t_order","rows":["java.util.ArrayList",[]]},"afterImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"t_order","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":-5,"value":["java.lang.Long",23]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"user_id","keyType":"NULL","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"commodity_code","keyType":"NULL","type":12,"value":"Owlias-1.3"},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"count","keyType":"NULL","type":4,"value":20},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"money","keyType":"NULL","type":3,"value":["java.math.BigDecimal",2000.00]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"status","keyType":"NULL","type":4,"value":0}]]}]]}}]]}
// store update
{"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"192.168.1.149:8091:5450146059598176633","branchId":5450146059598176650,"sqlUndoLogs":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"UPDATE","tableName":"t_storage","beforeImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"t_storage","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"count","keyType":"NULL","type":4,"value":94}]]}]]},"afterImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"t_storage","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"count","keyType":"NULL","type":4,"value":92}]]}]]}}]]}


搭建 seata 服务端

项目中需要安装 Seata 服务端,也就是 TC 事务协调器。只在微服务里引入 seata-spring-boot-starter 只是让你的代码变成了 TM(事务管理器) 和 RM(资源管理器)。如果没有独立运行的 TC(Seata 服务端) 在后台坐镇,整个分布式事务在运行时连最基本的 “花名册注册 ”和 “全局锁发放 ”都无法完成。

在本地测试或沙盒环境里,大家喜欢用 File 模式(无脑存本地文件,单机运行)。但在真实的生产环境中,为了应对高并发、高可用(HA)以及容灾,通常需要:

  • TC 必须集群化(Clustering):生产线至少部署 2~3 台 Seata-Server 节点组成集群,挂载到同一个 Nacos 注册中心上,实现无状态的负载均衡与故障转移。
  • 存储模式必须是 DB 或 Redis 模式:绝对禁止 File 模式!多台 Seata-Server 必须共享同一个外部数据库(zdemo_seata_tc)或者高性能 Redis 哨兵/集群,用来共享保存全局事务状态和全局行锁。
  • 配置统一交由 Nacos Config 管辖:Seata 的所有动态调优参数(如锁重试次数、心跳间隔)全部人肉持久化在 Nacos 配置中心,绝不写死在本地。

这里我们 seata 数据库采用 mysql 来实现,配置文件简单起见先使用本地的,seaa服务注册到 nacos,方便 TC 客户端的调用。


准备 TC 库表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
CREATE DATABASE IF NOT EXISTS `zdemo_seata_tc` DEFAULT CHARACTER SET utf8mb4;
USE `zdemo_seata_tc`;

-- 1. 全局事务会话表(存放全局事务 ID、状态等)
CREATE TABLE IF NOT EXISTS `global_table` (
`xid` VARCHAR(128) NOT NULL, -- 全局事务ID。格式为 IP:Port:递增序列,用来在全网微服务间跨网络传递。
`transaction_id` BIGINT NOT NULL, -- TC 内部维护的全局事务数字 ID。 注意,它是由 TC 颁发的,用来做内部的高效索引和数值关联(有独立索引 idx_transaction_id),不是本地 MySQL 的那个 innodb_trx 的 id。
`status` TINYINT NOT NULL, -- 全局事务状态码。(例如:1: Begin 开启, 2: Committing 提交中, 6: Rollbacking 回滚中, 13: TimeoutRollbacking 超时回滚中等)。TC 后台的定时线程会根据这个状态来判断是该通知 RM 去 Confirm 还是 Cancel。
`application_id` VARCHAR(32) DEFAULT NULL, -- 始作俑者的应用服务名。 比如你的订单服务叫 demo-order-service,方便在控制台和日志中一眼定位到是谁发起的全局事务。
`transaction_service_group` VARCHAR(32) DEFAULT NULL, -- 事务服务组。 对应你配置的 zdemo_tx_group。用来做多租户或集群隔离。
`transaction_name` VARCHAR(128) DEFAULT NULL, -- 全局事务的方法名/别名。 通常默认是你 @GlobalTransactional 标记的那个 Java 方法全路径或自定义名称,排查链路时极有用。
`timeout` INT DEFAULT NULL, -- 全局事务超时时间(单位:毫秒)。 如果超过这个时间二阶段还没完结,TC 就会根据该字段单方面宣判事务超时并强制启动回滚流程。
`begin_time` BIGINT DEFAULT NULL, -- 全局事务开启的时间戳(毫秒)。 用来计算当前事务是否已经超时(当前时间 - begin_time > timeout)。
`application_data` VARCHAR(2000) DEFAULT NULL, -- 全局上下文附加参数。 预留的扩展字段。可以存放一些全局性的上下文元数据(打包成 JSON 串)。
`gmt_create` DATETIME DEFAULT NULL, -- 事务创建时间。
`gmt_modified` DATETIME DEFAULT NULL, -- 状态最后修改时间。
PRIMARY KEY (`xid`), -- 全局事务ID充当主键
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), -- 该联合索引是 TC 后台定时扫描线程的黄金索引,专门用来捞出那些卡在中间状态、或者过期的死账进行异步重试。
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 2. 分支事务会话表(存放各微服务一阶段提交的分支事务信息)
CREATE TABLE IF NOT EXISTS `branch_table` (
`branch_id` BIGINT NOT NULL, -- 主键。分支事务唯一ID。由 TC 生成并颁发给 RM,你在二阶段日志里抓到现行的 branchId=54501460595... 就是它。
`xid` VARCHAR(128) NOT NULL, -- 外键关联。 挂靠的全局事务 ID。表明自己是哪一个老大的“小弟”。(有索引 idx_xid,方便 TC 通过 XID 一把捞出所有要清理的分支)。
`transaction_id` BIGINT DEFAULT NULL, -- 对应的全局事务数字 ID(冗余自 global_table,为了提升多表关联查询性能)。
`resource_group_id` VARCHAR(32) DEFAULT NULL, -- 资源分组 ID。 预留字段,通常默认和资源 ID 一致或为空。
`resource_id` VARCHAR(512) DEFAULT NULL, -- 资源 ID。 * AT 模式下是数据库连接串的 URL/数据库名(如 jdbc:mysql://.../store_db);TCC 模式下是你在接口上声明的 TCC Action 名字(比如你日志里的 storageTccAction)。TC 靠它才能精准找到该通知哪个资源去执行二阶段。
`branch_type` VARCHAR(8) DEFAULT NULL, -- 分布式事务模式。 常见的有 AT、TCC、SAGA、XA。这决定了 TC 在二阶段发网络广播时采用什么数据协议。
`status` TINYINT DEFAULT NULL, -- 分支事务的局部状态。(如 1: Registered 已注册, 3: PhaseTwo_CommitFailed_Retryable 二阶段提交失败待重试)。你之前参数缺失导致二阶段返回 false 时,这个字段就会被 TC 改为重试状态。
`client_id` VARCHAR(64) DEFAULT NULL, -- 客户端标识。记录了当前分支事务是由哪台具体 IP 和端口的机器(你的微服务节点)汇报上来的,方便二阶段精准推送。
`application_data` VARCHAR(2000) DEFAULT NULL, -- 二阶段的“续命参数盘”
`gmt_create` DATETIME DEFAULT NULL,
`gmt_modified` DATETIME DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 3. 全局行锁表(AT 模式读写隔离的核心防御,锁死物理行)
-- AT 模式一阶段本地事务就提交了(物理锁释放了)。为了防止别的并发非事务线程或者别的分布式事务跑进来把数据改了(造成脏写/破坏隔离性),Seata 会将要修改的表名+主键拼接成一个全局行锁,扔到这张表里。
CREATE TABLE IF NOT EXISTS `lock_table` (
`row_key` VARCHAR(128) NOT NULL, -- 主键。 格式为 resource_id#table_name#pk_value(如:store_db#t_storage#1001)。代表某个数据库的某张表的主键为 1001 的行被锁了。由于是主键,全网只要发生锁冲突,第二笔一阶段注册就会在这里撞车爆出全局锁冲突异常。
`xid` VARCHAR(128) DEFAULT NULL, -- 持有这个行锁的全局事务 XID。
`transaction_id` BIGINT DEFAULT NULL, -- 持有该锁的全局事务数字 ID。
`branch_id` BIGINT DEFAULT NULL, -- 到底是哪个具体的子分支(哪个微服务)锁死这行数据的。(有索引 idx_branch_id)。
`resource_id` VARCHAR(512) DEFAULT NULL, -- 对应的数据库资源(数据源 ID)。
`table_name` VARCHAR(32) DEFAULT NULL, -- 被锁的目标业务表名。
`pk` VARCHAR(36) DEFAULT NULL, -- 被锁的目标业务主键真实值(如 1001)。
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking', -- 行锁状态。
`gmt_create` DATETIME DEFAULT NULL,
`gmt_modified` DATETIME DEFAULT NULL,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 4. 在 Seata 1.X 时代,数据库模式(mode: db)只需要三张核心表:global_table、branch_table 和 lock_table。
-- 但是在你目前使用的 Seata 2.0.0 中,为了支持 TC 集群的高可用防脑裂,引入了基于数据库的分布式锁机制,因此必须额外增加第四张表:distributed_lock。
CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` VARCHAR(128) NOT NULL, -- 主键。 锁的标识名。比如 TC 的定时回滚任务锁名为 AsyncCommitting 或 RetryRollbacking。
`lock_value` VARCHAR(128) NOT NULL, -- 抢到锁的 TC 实例标识。 通常是当前抢到锁的那个 Seata-Server 节点的 IP:Port。表明当前这个定时任务被哪台 TC 承包了。
`expire` BIGINT NOT NULL, -- 锁的过期时间戳(毫秒值)。
PRIMARY KEY (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

-- 顺手塞入一条 Seata TC 初始化所需的事务超时检查锁底物
INSERT IGNORE INTO `distributed_lock` (`lock_key`, `lock_value`, `expire`)
VALUES ('AsyncCommitting', ' ', 0),
('RetryCommitting', ' ', 0),
('RetryRollbacking', ' ', 0),
('TxTimeoutCheck', ' ', 0);


安装 Seata-Server

前往 Seata 官方下载 页面,下载适配的二进制压缩包(例如 seata-server-2.0.0.tar.gz),并解压到你的物理服务器(或 CentOS VM)中:

1
2
$ tar -zxvf seata-server-2.0.0.tar.gz -C /usr/local/
$ cd /usr/local/seata/conf

注意:在官方给出的 2.0.0 版本中,lib 路径下 jdbc 有多个版本,需要根据使用的 mysql 数据库版本选择并删掉哪些不用的版本,不然会不断有 Caused by: java.sql.SQLException: Unknown system variable ‘query_cache_size’ 的报错。

修改 Seata 服务端核心配置文件:在 conf 目录下,2.x 版本已经极简化。我们直接编辑 application.yml(或者 application.properties),把它的运行模式由默认的单机文件版人肉魔改为生产级的 Nacos 发现 + DB 存储模式(可以参考官方在同一文件夹中的示例文件 application.example.yml,在此基础上修改),我的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
server:
port: 7091

spring:
application:
name: seata-server

logging:
config: classpath:logback-spring.xml
file:
path: ${log.home:${user.home}/logs/seata}

console:
user:
username: seata
password: seata

seata:
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/**

config:
# support: nacos 、 consul 、 apollo 、 zk 、 etcd3
type: file

registry:
# support: nacos 、 eureka 、 redis 、 zk 、 consul 、 etcd3 、 sofa
type: nacos
preferred-networks: 192.168.*
nacos:
application: seata-server
server-addr: 192.168.1.149:8848
group: SEATA_GROUP
namespace: prod-zdemo
cluster: default
username: nacos
password: nacos

store:
# support: file 、 db 、 redis 、 raft
mode: db
session:
mode: db
lock:
mode: db
file:
dir: sessionStore
max-branch-session-size: 16384
max-global-session-size: 512
file-write-buffer-cache-size: 16384
session-reload-read-size: 100
flush-disk-mode: async
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.1.251:3306/zdemo_seata_tc?useSSL=false&serverTimezone=Asia/Shanghai
user: root
password: 123456
min-conn: 10
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 1000
max-wait: 5000

metrics:
enabled: false
registry-type: compact
exporter-list: prometheus
exporter-prometheus-port: 9898

启动并查看日志:

1
2
$ /usr/local/seata/bin/seata-server.sh -h 192.168.1.149 -p 8091 -m db
$ tail -f /root/logs/seata/seata-server.8091.all.log

前端控制台页面(用户名密码 seata/seata):

1
$ curl http://192.168.1.149:7091


XA 模式的实现案例

在微服务分布式事务中,XA 模式是一种强一致性(Strong Consistency)的解决方案。它与上面 AT 模式的不同点在于:

如果你现在的场景是银行转账、涉及金钱审计等对账严格、不允许半点中间状态暴露的业务,则建议使用 XA 模式;如果是高并发的电商下单、商品扣库存,建议保留原有的 AT 模式。

在 Spring Cloud 2023 + Seata 2.0 架构中,将默认的 AT 模式切换为 XA 模式非常简单,核心逻辑在于更换数据源代理类型。它的底层依赖于数据库本身对 XA 规范(两阶段提交,2PC)的原生支持(如 MySQL 5.7+ 默认支持)。XA 模式的两阶段提交运转逻辑:

  • 第一阶段(Prepare): 业务微服务(RM)执行业务 SQL,但不提交事务。Seata 代理数据源会向数据库发送 XA START、XA END 和 XA PREPARE。此时数据库资源(如行锁)会被真正锁住。
  • 第二阶段(Commit/Rollback): 事务管理器(TM)根据所有微服务的执行结果,通知 Seata TC。TC 如果下发 Commit,各个数据库执行 XA COMMIT 真正落盘并释放锁;如果任何一个服务失败,TC 下发 Rollback,各个数据库执行 XA ROLLBACK 回滚。

由于我们使用的是 Seata 2.0.0,官方已经极大地简化了模式切换。你只需要在订单微服务(Order)和 库存微服务(Store)以及账户微服务(User)中做以下两步调整:

第一步:在 application.yml 中修改数据源代理模式

1
2
seata:
data-source-proxy-mode: XA # 核心切换:将默认的 AT 改为 XA

第二步:确保数据库驱动支持 XA。因为 XA 模式是由数据库驱动直接与 MySQL 通信完成的,这就要求我们前面大换血时引入的 com.mysql.cj.jdbc.Driver 必须到位。同时,确保微服务连接的 MySQL 账号拥有 XA 事务的权限:

1
2
3
-- 如果你的微服务数据库账号不是 root,需要赋予 XA_RECOVER_ADMIN 权限(MySQL 8.0+ 要求)
GRANT XA_RECOVER_ADMIN ON *.* TO '你的数据库用户名'@'%';
FLUSH PRIVILEGES;

XA 模式对业务代码是完全无侵入的。你在 zdemo-seata-order 的 Service 方法上原有的 @GlobalTransactional 注解不需要做任何修改:

1
2
3
4
@GlobalTransactional(name = "zdemo-create-order-tx", rollbackFor = Exception.class)
public void createOrder(OrderDTO orderDto) {
// ...
}


TCC 模式的实现案例

代码注意事项

在分布式事务中,TCC 模式 (Try-Confirm-Cancel) 属于典型的业务层两阶段提交。它与 AT、XA 模式最大的不同在于:Seata 框架不再帮你自动代理数据源、不再帮你自动生成回滚日志,而是把事务的控制权完全交给了你的业务代码。正因为它的自由度极高,所以在实际编码时,需要考虑几个极其致命的 “隐形陷阱”。

  1. 业务悬挂(Anti-Suspension)
    • 现象:某一微服务的 Try 请求因为网络拥堵严重超时,分布式事务管理器(TM)以为该节点失败了,于是对整个事务下发了 Cancel 指令。结果 Cancel 执行完后,那个迟到的 Try 请求才真正到达服务器。
    • 后果:此时如果没有防御,Try 方法会再次执行,成功锁住一笔业务资源。但由于 Cancel 已经走完了,这笔资源将永远无法被释放(发生内存/资源泄露)。
    • 解法:在执行 Try 之前,必须先去交易记录表检查,当前全局事务 ID(XID)是否已经执行过 Cancel 或 Confirm。如果是则直接拦截并报错,绝不执行 Try。
  2. 空回滚(Null-Cancel)
    • 现象:分布式事务在执行 Try 的时候,因为网络抖动或者服务刚启动直接崩溃,导致 Try 方法压根就没有被调用成功。这时候全局事务失败,TC 会顺理成章地下发 Cancel 指令。
    • 后果:Cancel 方法此时被调用了,但它发现本地啥都没有处理过。如果你在 Cancel 里写的是 update account set balance = balance + 100(无脑加回),就会平白无故多给用户退了 100 块钱。
    • 解法:Cancel 接口被触发时,首先要检查 Try 阶段的业务日志/事务记录是否存在。如果不存在,说明 Try 根本没成功,此时 Cancel 应当什么都不做,直接返回成功(即空回滚)。
  3. 幂等性(Idempotence)
    • 现象:网络超时触发了 RPC 框架的自动重试,或者 Seata TC 发送的 Confirm / Cancel 请求由于网络丢包,在未收到 ACK 的情况下重复发送。
    • 后果:如果 Confirm 连续扣了两次钱,或者 Cancel 连续加了两次库存,业务数据直接崩盘。
    • 解法设计一张事务控制表(内含 xid, branch_id, state)。利用数据库的唯一索引(Unique Key)或者 state 的 CAS 状态机更新(例如:update tcc_log set state = ‘CONFIRMED’ where xid = ? and state = ‘TRY’)。只有更新成功的那个请求才能真正执行二阶段的业务。
  4. 数据并发隔离与资产保护(Data Isolation)
    • 现象:在 AT 或 XA 模式下,有数据库行锁或 Seata 全局锁帮你把资源死死扣住,外界无法修改。但 TCC 是一阶段直接把本地事务提交了。
    • 后果:比如用户的账户余额有 100 元,Try 阶段检查通过,准备买 80 元的东西,但 Try 并没有真正扣钱,只是在内存或一个冻结表里记录了一下。在 Confirm 还没下发前,用户通过另一个普通线程迅速提现了 50 元(此时余额变 50)。等 Confirm 真正下发执行扣减 80 元时,账户直接变成了 -30 元(发生业务资损、负资产)。
    • 解法:TCC 的 Try 阶段绝对不能仅仅做 SELECT 校验,必须引入“冻结资金”或“预留库存”的物理概念。
      • 不要直接在 Try 里看 balance > 80。
      • 而是应该在主表里 balance = balance - 80, freeze_balance = freeze_balance + 80,或者引入一张单独的 freeze_record 表,把资源在 Try 阶段通过数据库行锁硬锁住。
  5. 性能与脏数据清理(异步与降级机制)
    • 现象:如果分布式事务的二阶段(Confirm 或 Cancel)因为目标数据库宕机长期失败,Seata 会按照指数退避算法不断发起重试。
    • 后果:如果重试了 100 次数据库还没好,这笔被“预留”或“冻结”的业务资源(比如某张电影票、某笔资金)就会被无限期挂起,直接影响后续正常用户的购买。
    • 解法:超时自动解锁,冻结表或预留表里必须带有 expire_time(过期时间字段);或者编写异步的监控脚本,当发现某笔 TCC 事务处于 TRY 状态超过 1 小时,且 TC 没有下文时,自动触发报警或者根据业务策略强行进行本地 Cancel 冲正。


具体实现案例

将上述 “下单-扣库存-扣余额” 案例改造为 TCC 模式,我们需要对代码结构进行重构。为了完美防御空回滚、幂等性、业务悬挂,并做到数据并发隔离,我们需要:

  • 引入 TCC 专属的控制日志表(或利用状态机):用来人肉记录每一个分支事务的执行状态。
  • 重构业务逻辑做资源预留:
    • 库存服务:不再无脑扣减 count,而是扣减 count 的同时增加 freeze_count(冻结库存)。
    • 账户服务:不再无脑扣减 money,而是扣减 money 的同时增加 freeze_money(冻结余额)。
  • 为了防范异常,我们需要一张 分布式事务控制表(也可以两边服务各建一张)。


数据库表的改动

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 1. 库存表增加冻结字段
ALTER TABLE zdemo_seata_store.t_storage ADD COLUMN `freeze_count` int(11) NOT NULL DEFAULT '0' COMMENT '冻结库存' AFTER `count`;

-- 2. 账户表增加冻结字段
ALTER TABLE zdemo_seata_user.t_account ADD COLUMN `freeze_money` decimal(14,2) NOT NULL DEFAULT '0.00' COMMENT '冻结金额' AFTER `money`;

-- 3. TCC 分支事务控制表(各个微服务本地库都需要有,用来做幂等和防悬挂)
CREATE TABLE IF NOT EXISTS `tcc_transaction_log` (
`xid` VARCHAR(128) NOT NULL COMMENT '全局事务XID',
`branch_id` BIGINT NOT NULL COMMENT '分支事务ID',
`state` INT NOT NULL COMMENT '状态: 1-TRY, 2-CONFIRM, 3-CANCEL',
PRIMARY KEY (`xid`, `branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


接口层设计

TCC 必须定义在 Interface 上。Seata 要求 TCC 的二阶段方法必须通过 @LocalTCC 或在 Feign 接口上通过 @TwoPhaseBusinessAction 进行声明。

库存服务 TCC 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.zdemo.scloud.store.service;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

public interface StorageTccService {

/**
* 一阶段:准备并冻结资源
* TwoPhaseBusinessAction 注解向 TC 注册 TCC 行为
*/
@TwoPhaseBusinessAction(name = "storageTccAction", commitMethod = "confirm", rollbackMethod = "cancel")
boolean prepareDecrease(BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "commodityCode") String commodityCode,
@BusinessActionContextParameter(paramName = "count") Integer count);

/**
* 二阶段:提交(真正扣减冻结资源)
*/
boolean confirm(BusinessActionContext actionContext);

/**
* 二阶段:回滚(释放冻结资源)
*/
boolean cancel(BusinessActionContext actionContext);
}

账户服务 TCC 接口:

1
2
3
4
5
6
7
8
9
10
11
public interface AccountTccService {

@TwoPhaseBusinessAction(name = "accountTccAction", commitMethod = "confirm", rollbackMethod = "cancel")
boolean prepareDecrease(BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "money") BigDecimal money);

boolean confirm(BusinessActionContext actionContext);

boolean cancel(BusinessActionContext actionContext);
}


库存接口实现层

StorageTccServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.zdemo.scloud.store.service;

import com.zdemo.scloud.store.entity.TccTransactionLog;
import com.zdemo.scloud.store.mapper.StorageTccMapper;
import com.zdemo.scloud.store.mapper.TccTransactionLogMapper;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.LocalTCC;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@LocalTCC // 🌟 必须加在实现类上,开启本地 TCC 代理拦截
public class StorageTccServiceImpl implements StorageTccService {

@Resource
private StorageTccMapper storageTccMapper;
@Resource
private TccTransactionLogMapper tccTransactionLogMapper;

@Override
@Transactional(rollbackFor = Exception.class)
public boolean prepareDecrease(BusinessActionContext actionContext, String commodityCode, Integer count) {
String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID();
Long branchId = actionContext != null ? actionContext.getBranchId() : null;
log.info("[Storage TCC - Try] 开始锁定库存. XID: {}, BranchId: {}", xid, branchId);

TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId);
if (logExist != null && logExist.getState() == 3) {
// 绝对不能 return false!必须抛出异常让全局知道 Try 失败了!
throw new RuntimeException("[Storage TCC - Try] 检测到当前分支事务已发生 Cancel 悬挂,强行终止 Try 动作!");
}

// 冻结可用库存
int affectedRows = storageTccMapper.freezeStorage(commodityCode, count);
if (affectedRows == 0) {
throw new RuntimeException("【仓储中心】库存不足,锁定失败!");
}

TccTransactionLog tccLog = new TccTransactionLog();
tccLog.setXid(xid);
tccLog.setBranchId(branchId);
tccLog.setState(1);
tccTransactionLogMapper.insert(tccLog);

// 生产防投毒:手动同步参数到 Seata 异步大盘,双保险防御参数丢失
if (actionContext != null) {
actionContext.addActionContext("commodityCode", commodityCode);
actionContext.addActionContext("count", count);
}
return true;
}

@Override
@Transactional(rollbackFor = Exception.class)
public boolean confirm(BusinessActionContext actionContext) {
String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID();
Long branchId = actionContext != null ? actionContext.getBranchId() : null;

// 生产级防空指针取法(一般在 confirm 或 cancel 打断点就会出现空指针)
Object commodityCodeObj = actionContext != null ? actionContext.getActionContext("commodityCode") : null;
Object countObj = actionContext != null ? actionContext.getActionContext("count") : null;
// 如果由于前台传参或者 Feign 掉队导致真的没拿到上下文参数,直接触发空回滚,不报死空指针
if (commodityCodeObj == null || countObj == null) {
log.warn("[Storage TCC - Confirm] 上下文参数缺失");
return false; // Confirm 缺失核心参数不能无脑说成功,应当返回 false 让 TC 不断重试报警。如果不断重试报警就需要去 seata_server 对应的库中去处理清数据了。
}

TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId);
if (logExist != null && logExist.getState() == 2) {
log.info("[Storage TCC - Confirm] 分支已确认过,幂等放行。");
return true;
}

String commodityCode = commodityCodeObj.toString();
Integer count = Integer.parseInt(countObj.toString());

// 🌟 先尝试更新控制表状态(从 1 改为 2),利用数据库行锁锁住这笔流水。 UPDATE tcc_transaction_log SET state = 2 WHERE xid = #{xid} AND branch_id = #{branchId} AND state = 1
int updatedRows = tccTransactionLogMapper.updateStateWithCas(xid, branchId, 2, 1);
if (updatedRows > 0) {
// 状态占坑成功,再真正划转扣减冻结资产,彻底阻断并发重试超卖风险
storageTccMapper.deductFreezeStorage(commodityCode, count);
log.info("[Storage TCC - Confirm] 扣减冻结库存成功,事务完结落盘。");
return true;
} else {
// 占坑失败,再次人肉确认是否是被别的并发重试线程更新成功的
TccTransactionLog doubleCheck = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId);
if (doubleCheck != null && doubleCheck.getState() == 2) {
log.info("[Storage TCC - Confirm] 并发占坑失败,但由于最终状态已为 CONFIRM,允许幂等放行。");
return true;
}
log.error("[Storage TCC - Confirm] 状态扣减失败,且未见成功痕迹,要求 TC 触发重试!");
return false; // 逼迫 TC 保持重试,杜绝资损
}

}

@Override
@Transactional(rollbackFor = Exception.class)
public boolean cancel(BusinessActionContext actionContext) {
String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID();
Long branchId = actionContext != null ? actionContext.getBranchId() : null;

// 生产级防空指针取法(一般在 confirm 或 cancel 打断点就会出现空指针)
Object commodityCodeObj = actionContext != null ? actionContext.getActionContext("commodityCode") : null;
Object countObj = actionContext != null ? actionContext.getActionContext("count") : null;
if (commodityCodeObj == null || countObj == null) {
log.warn("[Storage TCC - Cancel] 上下文参数缺失,极可能一阶段在数据序列化前已超时,触发空回滚兜底。");
}

// 🌟 二阶段幂等最高防线。每次进来,第一件事永远是人肉去数据库查一下这条分支记录到底在不在!
TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId);

// 情况 A:表里啥都没有 -> 确定是【空回滚】
if (logExist == null) {
try {
TccTransactionLog tccLog = new TccTransactionLog(xid, branchId, 3); // 3 代表 CANCEL
tccTransactionLogMapper.insert(tccLog); // 挂起防悬挂桩
log.info("[Storage TCC - Cancel] 空回滚防御成功,成功插入 CANCEL 占位记录。");
} catch (DuplicateKeyException e) {
// 极端并发兜底:万一 TC 的两个重试线程同时杀到这里,都看到 logExist == null
// 其中一个 insert 成功,另一个会报主键冲突。这里直接 catch 住,放行,允许幂等成功!
log.warn("[Storage TCC - Cancel] 并发空回滚 insert 冲突,触发底层锁幂等防线,直接放行。");
}
return true;
}

// 情况 B:表里有记录,且已经是 CANCEL(3) 或者 CONFIRM(2) 状态 -> 确定是【重复重试】
if (logExist.getState() == 3 || logExist.getState() == 2) {
log.info("[Storage TCC - Cancel] 检测到当前分支事务已处理过(state={}),直接幂等放行。", logExist.getState());
return true;
}

// 情况 C:表里有记录,且状态是 TRY(1) -> 确定是一阶段成功后的【正向逆操作回滚】
if (logExist.getState() == 1) {
// 上下文参数
if (commodityCodeObj == null || countObj == null) {
log.error("[Storage TCC - Cancel] 致命级资损隐患:本地有 Try 记录,但分布式上下文丢失参数,库存无法安全解冻!");
throw new RuntimeException("TCC 异步参数丢失,拒绝空回滚,等待 TC 重试或人工介入!"); // 抛出异常,拒绝承认成功!
}
String commodityCode = commodityCodeObj.toString();
Integer count = Integer.parseInt(countObj.toString());

// 🌟 先通过 CAS 更新控制表状态(从 1 改为 3)
int updatedRows = tccTransactionLogMapper.updateStateWithCas(xid, branchId, 3, 1);
if (updatedRows > 0) {
// 状态修改成功,安全解冻资产
storageTccMapper.unfreezeStorage(commodityCode, count);
log.info("[Storage TCC - Cancel] 真正反向解冻资产成功,分布式事务安全回滚。");
return true;
} else {
// CAS 失败,二次确认是否已经被并发线程改成 3 了
TccTransactionLog doubleCheck = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId);
if (doubleCheck != null && doubleCheck.getState() == 3) {
log.info("[Storage TCC - Cancel] 并发逆操作冲突,但当前状态已被成功置为 CANCEL,直接放行。");
return true;
}
log.error("[Storage TCC - Cancel] CAS 回滚占坑失败,要求 TC 重新下发 Cancel 重试!");
return false;
}
}

// 🌟 兜底保障:只有当 logExist.getState() 出现了非 A、B、C 的诡异脏数据时才会走到这里
log.error("[Storage TCC - Cancel] 遇到了未知的事务控制表状态: {}", logExist.getState());
return false;
}
}

StorageTccMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Mapper
public interface StorageTccMapper {

@Update("UPDATE t_storage SET count = count - #{count}, freeze_count = freeze_count + #{count} " +
"WHERE commodity_code = #{commodityCode} AND count >= #{count}")
int freezeStorage(@Param("commodityCode") String commodityCode, @Param("count") Integer count);

@Update("UPDATE t_storage SET freeze_count = freeze_count - #{count} " +
"WHERE commodity_code = #{commodityCode} AND freeze_count >= #{count}")
int deductFreezeStorage(@Param("commodityCode") String commodityCode, @Param("count") Integer count);

@Update("UPDATE t_storage SET count = count + #{count}, freeze_count = freeze_count - #{count} " +
"WHERE commodity_code = #{commodityCode} AND freeze_count >= #{count}")
int unfreezeStorage(@Param("commodityCode") String commodityCode, @Param("count") Integer count);
}

TccTransactionLogMapper

1
2
3
4
5
6
7
8
9
10
11
12
@Mapper
public interface TccTransactionLogMapper {

@Select("SELECT xid, branch_id as branchId, state FROM tcc_transaction_log WHERE xid = #{xid} AND branch_id = #{branchId}")
TccTransactionLog selectByXidAndBranch(@Param("xid") String xid, @Param("branchId") Long branchId);

@Insert("INSERT INTO tcc_transaction_log(xid, branch_id, state) VALUES(#{xid}, #{branchId}, #{state})")
int insert(TccTransactionLog log);

@Update("UPDATE tcc_transaction_log SET state = #{targetState} WHERE xid = #{xid} AND branch_id = #{branchId} AND state = #{oldState}")
int updateStateWithCas(@Param("xid") String xid, @Param("branchId") Long branchId, @Param("targetState") Integer targetState, @Param("oldState") Integer oldState);
}

TccTransactionLog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.zdemo.scloud.store.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TccTransactionLog {
private String xid;
private Long branchId;
private Integer state; // 1-TRY, 2-CONFIRM, 3-CANCEL
}


账户接口实现层

AccountTccServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.zdemo.scloud.user.service;

import com.zdemo.scloud.user.entity.TccTransactionLog;
import com.zdemo.scloud.user.mapper.AccountTccMapper;
import com.zdemo.scloud.user.mapper.TccTransactionLogMapper;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.LocalTCC;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;

@Slf4j
@Service
@LocalTCC
public class AccountTccServiceImpl implements AccountTccService {

@Resource
private AccountTccMapper accountTccMapper;
@Resource
private TccTransactionLogMapper tccTransactionLogMapper;

@Override
@Transactional(rollbackFor = Exception.class)
public boolean prepareDecrease(BusinessActionContext actionContext, Long userId, BigDecimal money) {
// 控空防御,防止第一行引发 NPE
String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID();
Long branchId = actionContext != null ? actionContext.getBranchId() : null;
log.info("[Account TCC - Try] 开始预留资金. XID: {}, BranchId: {}, Money: {}", xid, branchId, money);

// 1. 强力防悬挂
TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId);
if (logExist != null && logExist.getState() == 3) {
// 🌟 绝对不能 return false!必须抛出异常让全局感知 Try 失败!
throw new RuntimeException("[Account TCC - Try] 检测到当前分支事务已发生 Cancel 悬挂,强行终止 Try 动作!");
}

// 2. 资产锁定(扣减余额,增加冻结)
int affectedRows = accountTccMapper.freezeMoney(userId, money);
if (affectedRows == 0) {
throw new RuntimeException("【账户中心】余额不足或账户异常,冻结资金失败!");
}

// 3. 记录 Try 成功状态
TccTransactionLog tccLog = new TccTransactionLog();
tccLog.setXid(xid);
tccLog.setBranchId(branchId);
tccLog.setState(1); // 1-TRY
tccTransactionLogMapper.insert(tccLog);

// 4. 手动同步参数到 Seata 异步上下文,双保险防御参数丢失
if (actionContext != null) {
actionContext.addActionContext("userId", userId);
actionContext.addActionContext("money", money);
}
return true;
}

@Override
@Transactional(rollbackFor = Exception.class)
public boolean confirm(BusinessActionContext actionContext) {
// 二阶段开头做安全控空防御
String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID();
Long branchId = actionContext != null ? actionContext.getBranchId() : null;

Object userIdObj = actionContext != null ? actionContext.getActionContext("userId") : null;
Object moneyObj = actionContext != null ? actionContext.getActionContext("money") : null;
if (userIdObj == null || moneyObj == null) {
log.warn("[Account TCC - Confirm] 上下文参数缺失");
return false; // 核心参数缺失不能无脑说成功,应当返回 false 让 TC 保持重试报警
}

// 1. 幂等校验
TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId);
if (logExist != null && logExist.getState() == 2) {
log.info("[Account TCC - Confirm] 该分支已提交,幂等放行。");
return true;
}

Long userId = Long.valueOf(userIdObj.toString());
BigDecimal money = new BigDecimal(moneyObj.toString());

// 🌟 核心优化:先尝试用 CAS 更新控制表状态(从 1 改为 2),利用数据库行锁锁住这笔流水
int updatedRows = tccTransactionLogMapper.updateStateWithCas(xid, branchId, 2, 1);
if (updatedRows > 0) {
// 状态占坑成功,再真正解冻并扣减金额,阻断并发重试导致的二次扣款
accountTccMapper.deductFreezeMoney(userId, money);
log.info("[Account TCC - Confirm] 真正扣款落盘成功,事务完结。");
return true;
} else {
// 占坑失败,二次确认是否是被别的并发重试线程更新成功的
TccTransactionLog doubleCheck = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId);
if (doubleCheck != null && doubleCheck.getState() == 2) {
log.info("[Account TCC - Confirm] 并发占坑失败,但由于最终状态已为 CONFIRM,允许幂等放行。");
return true;
}
log.error("[Account TCC - Confirm] 状态占坑失败,且未见成功痕迹,要求 TC 触发重试!");
return false;
}
}

@Override
@Transactional(rollbackFor = Exception.class)
public boolean cancel(BusinessActionContext actionContext) {
// 二阶段开头做安全控空防御
String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID();
Long branchId = actionContext != null ? actionContext.getBranchId() : null;

Object userIdObj = actionContext != null ? actionContext.getActionContext("userId") : null;
Object moneyObj = actionContext != null ? actionContext.getActionContext("money") : null;

if (userIdObj == null || moneyObj == null) {
log.warn("[Account TCC - Cancel] 上下文参数缺失,极可能一阶段在数据序列化前已超时,触发空回滚兜底。");
}

TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId);

// 情况 A:空回滚防悬挂处理
if (logExist == null) {
try {
TccTransactionLog tccLog = new TccTransactionLog(xid, branchId, 3); // 3-CANCEL
tccTransactionLogMapper.insert(tccLog);
log.info("[Account TCC - Cancel] 空回滚防御成功,成功向控制表打入 CANCEL 占位桩。");
} catch (DuplicateKeyException e) {
log.warn("[Account TCC - Cancel] 并发空回滚发生主键冲突,底层锁幂等放行成功。");
}
return true;
}

// 情况 B:重复重试,幂等放行
if (logExist.getState() == 3 || logExist.getState() == 2) {
log.info("[Account TCC - Cancel] 该分支状态为 {},无需重复回滚,幂等放行。", logExist.getState());
return true;
}

// 情况 C:正常逆操作回滚
if (logExist.getState() == 1) {
if (userIdObj == null || moneyObj == null) {
log.error("[Account TCC - Cancel] 致命级资损隐患:本地有 Try 记录,但分布式上下文丢失参数,资产无法安全解冻!");
throw new RuntimeException("TCC 异步参数丢失,拒绝空回滚,等待 TC 重试或人工介入!");
}
Long userId = Long.valueOf(userIdObj.toString());
BigDecimal money = new BigDecimal(moneyObj.toString());

// 🌟 核心优化:先通过 CAS 更新控制表状态(从 1 改为 3)
int updatedRows = tccTransactionLogMapper.updateStateWithCas(xid, branchId, 3, 1);
if (updatedRows > 0) {
// 状态修改成功,安全解冻资产,把冻结金额吐回可用余额
accountTccMapper.unfreezeMoney(userId, money);
log.info("[Account TCC - Cancel] 真正反向解冻资金成功,分布式事务安全回滚。");
return true;
} else {
// CAS 失败,二次确认是否已经被并发线程改成 3 了
TccTransactionLog doubleCheck = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId);
if (doubleCheck != null && doubleCheck.getState() == 3) {
log.info("[Account TCC - Cancel] 并发逆操作冲突,但当前状态已被成功置为 CANCEL,直接放行。");
return true;
}
log.error("[Account TCC - Cancel] CAS 回滚占坑失败,要求 TC 重新下发 Cancel 重试!");
return false;
}
}

// 🌟 脏数据防御机制
log.error("[Account TCC - Cancel] 遇到了未知的事务控制表状态: {}", logExist.getState());
return false;
}
}

AccountTccMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Mapper
public interface AccountTccMapper {

// 🌟 核心:更新可用余额的同时追加冻结金额,利用 SQL 层面防御并发负数
@Update("UPDATE t_account SET money = money - #{money}, freeze_money = freeze_money + #{money} " +
"WHERE user_id = #{userId} AND money >= #{money}")
int freezeMoney(@Param("userId") Long userId, @Param("money") BigDecimal money);

@Update("UPDATE t_account SET freeze_money = freeze_money - #{money} " +
"WHERE user_id = #{userId} AND freeze_money >= #{money}")
int deductFreezeMoney(@Param("userId") Long userId, @Param("money") BigDecimal money);

@Update("UPDATE t_account SET money = money + #{money}, freeze_money = freeze_money - #{money} " +
"WHERE user_id = #{userId} AND freeze_money >= #{money}")
int unfreezeMoney(@Param("userId") Long userId, @Param("money") BigDecimal money);

int updateStateWithCas(@Param("xid") String xid,
@Param("branchId") Long branchId,
@Param("targetState") Integer targetState,
@Param("oldState") Integer oldState);
}

TccTransactionLogMapper

1
2
3
4
5
6
7
8
9
10
11
12
@Mapper
public interface TccTransactionLogMapper {

@Select("SELECT xid, branch_id as branchId, state FROM tcc_transaction_log WHERE xid = #{xid} AND branch_id = #{branchId}")
TccTransactionLog selectByXidAndBranch(@Param("xid") String xid, @Param("branchId") Long branchId);

@Insert("INSERT INTO tcc_transaction_log(xid, branch_id, state) VALUES(#{xid}, #{branchId}, #{state})")
int insert(TccTransactionLog log);

@Update("UPDATE tcc_transaction_log SET state = #{targetState} WHERE xid = #{xid} AND branch_id = #{branchId} AND state = #{oldState}")
int updateStateWithCas(@Param("xid") String xid, @Param("branchId") Long branchId, @Param("targetState") Integer targetState, @Param("oldState") Integer oldState);
}

TccTransactionLog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.zdemo.scloud.user.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TccTransactionLog {
private String xid;
private Long branchId;
private Integer state; // 1-TRY, 2-CONFIRM, 3-CANCEL
}


暴露 Feign API 实现

下游微服务提供方在各自的对外 Controller 中暴露该 TCC 的 Try 接口即可:

StorageController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@RestController
@RequestMapping("/storage")
public class StorageController {

// @Resource
// private StorageService storageService;
@Resource
private StorageTccService storageTccService;

@PostMapping("/decrease")
public Result<Void> decrease(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) {
log.info("▶▶▶ [仓储服务] 收到扣减库存请求. XID: {}", RootContext.getXID()); // 通过 RootContext 抓取 XID,能看到说明 Seata 事务上下文成功通过 Feign 偷渡过来了!

// 外部 Feign 直接调用 Try 阶段接口。 actionContext 直接传 null,由于有了 @LocalTCC 和接口注入,这里的 null 在进入方法前会被 Seata 自动偷天换日,注入真实的上下文!
// storageService.decrease(commodityCode, count);
storageTccService.prepareDecrease(null, commodityCode, count);
return Result.success(null);
}
}

AccountController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@RestController
@RequestMapping("/account")
public class AccountController {

// @Resource
// private AccountService accountService;
@Resource
private AccountTccService accountTccService;

@PostMapping("/decrease")
public Result<Void> decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) {
log.info("▶▶▶ [账户服务] 收到扣减余额请求. XID: {}", RootContext.getXID());

// 🌟 Controller 层直接传 null,进入 Seata AOP 切面后会自动拦截并填充真实上下文
// accountService.decrease(userId, money);
accountTccService.prepareDecrease(null, userId, money);
return Result.success(null);
}
}

Order 订单发起方(TM)保持不变。由于 TCC 良好的抽象性,作为全局事务发起者的 OrderService 除了打印日志之外,不需要对全链路做出任何核心代码的修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
@Service
public class OrderService {

@Resource
private OrderMapper orderMapper;
@Resource
private StoreFeignClient storeFeignClient;
@Resource
private UserFeignClient userFeignClient;

@GlobalTransactional(name = "zdemo-create-order-tx", rollbackFor = Exception.class)
public void createOrder(OrderDTO orderDto) {
log.info("==========================================================================");
log.info("🏁 触发分布式下单大闸 [TCC 工业级标准实现模式]!XID = {}", RootContext.getXID());

// 1. 本地落单(状态依旧为初始 0)
Order order = new Order();
// ... (属性赋值保持一致)
orderMapper.insert(order);
log.info("Step 1 ➔ 本地订单创建成功(一阶段落盘)");

// 2. 远程呼叫库存服务(实质触发了 Storage 侧的 TCC - Try 资源冻结)
storeFeignClient.decrease(orderDto.getCommodityCode(), orderDto.getCount());
log.info("Step 2 ➔ 远程微服务[仓储中心] TCC-Try 锁定库存成功!");

// 3. 远程呼叫账户服务(实质触发了 Account 侧的 TCC - Try 资金冻结)
userFeignClient.decrease(orderDto.getUserId(), orderDto.getMoney());
log.info("Step 3 ➔ 远程微服务[账户中心] TCC-Try 锁定资金成功!");

// 4. 通关更新状态
order.setStatus(1);
orderMapper.updateById(order);
log.info("🎉 Step 4 ➔ 全链路业务预留通过,全局事务准备二阶段自动异步广播 Confirm!");
log.info("==========================================================================");
}
}


其他说明

TCC 模式需要改配置文件的 data-source-proxy-mode 吗? 其实是不需要的,TCC 模式完全不需要修改 data-source-proxy-mode,甚至根本不依赖 Seata 的数据源代理功能!

data-source-proxy-mode(可选值有 AT、XA)是专门给 AT 模式 和 XA 模式 准备的。哪怕你的 data-source-proxy-mode 配置成了 AT 或者是默认值,它也完全不会干扰 TCC 的运行。当一个接口被标记为 TCC 接口时,Seata 会自动走 TCC 的切面逻辑,直接绕过数据源代理拦截。

  • AT/XA 模式 属于 “框架接管事务”,Seata 必须把你的普通数据库连接池(如 Hikari、Druid)人肉包装成 DataSourceProxy,进而拦截你的 SQL 去自动生成 undo_log 或发送 XA PREPARE。
  • TCC 模式 则是完全由业务代码人肉接管事务。一阶段 Try、二阶段 Confirm / Cancel 里面写的都是普通的本地 SQL(利用 Spring 的 @Transactional 提交或回滚)。Seata 在这里只充当一个分布式协调拦截器(通过 AOP 切面记录分支状态),它根本不需要去代理、拦截你的底层数据库连接。

要在项目里顺畅运行 TCC 模式,配置文件(如 application.yml)中只需要确保:

1
2
seata:
enabled: true # 确保开启 Seata 客户端支持

这样Seata 就能扫描到你的 @LocalTCC 注解和 @TwoPhaseBusinessAction 注解,从而利用 AOP 建立分布式调用链路。


AT 与 TCC 的混合部署

混合部署的必要性

在电商分布式大促(如双十一、618)的极限并发场景下,热点商品库存(如 1 元秒杀的 iPhone、爆款国潮大衣)往往是引爆系统雪崩的万恶之源。如果你整个链路全盘采用 Seata AT 模式,由于它依赖数据库的物理排他锁(全局锁),在热点商品全网疯抢时,成千上万的线程会卡在同一个数据库行记录上死等,数据库连接池瞬间被抽干,引发全链路雪崩。这时候的终极解法就是:打破单一模式的银弹幻想,采用 AT 与 TCC 的“混合部署(混合事务)” 对核心链路进行性能松绑。

混合部署的核心思想是:“非核心链路用 AT(买开发效率),核心热点链路用 TCC(买性能)”。 在大促下单的微服务链路中,我们通常这样拆分:

1
2
3
4
5
6
7
8
9
10
# 在 AT 模式下,事务直到二阶段 Commit 之前,数据库的行锁、Redo Log 统统不能释放。
# 而在 TCC 模式下,一阶段 Try 的本地事务执行完就立刻提交、释放数据库物理行锁。
# 它把核心资产通过逻辑字段(如 freeze_count)进行了隔离。这种“快进快出”的逻辑,
# 彻底将数据库从排他锁的并发泥潭里拔了出来。

[用户下单]

├──> 订单服务 (Order Service) ────> 【AT 模式】(生成订单、优惠券,并发不高,业务复杂)

└──> 库存服务 (Storage Service) ───> 【TCC 模式】(热点商品库存,极限并发,不容卡顿)


混合部署的代码落地

Seata 最强大的一点在于,TM(事务管理器)根本不在乎底层是 AT 还是 TCC,它只认 XID

在 Order 下单业务起点挂载全局事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {

@Resource
private OrderMapper orderMapper; // 本地是 AT 模式的普通数据源代理
@Resource
private StorageFeignClient storageFeignClient; // 远程 TCC 库存服务

@Override
@GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class)
public void createOrder(OrderOrder order) {
// 1. 【AT 模式】直接插入订单表。Seata 会自动代理数据源,记录 Undo Log
orderMapper.insert(order);

// 2. 【跨服务传递】通过 Feign 调用库存服务。XID 会自动顺着网络传过去
// 远程库存服务的实现是 TCC 范式
storageFeignClient.prepareDecrease(null, order.getCommodityCode(), order.getCount());

// 3. 如果后续发生异常,Seata TC 会指挥订单执行 AT 的 Undo 回滚,同时指挥库存执行 TCC 的 Cancel
}
}

在库存微服务,全面切换为 TCC 架构。我们在库存服务的接口层和实现类上,按照之前打磨好的安全范式进行重构,彻底卸载物理行锁。接口层(向 Seata 注册 TCC 行为):

1
2
3
4
5
6
7
8
9
public interface StorageTccService {
// TwoPhaseBusinessAction 声明了这是一个 TCC 分支,并绑定二阶段方法
@TwoPhaseBusinessAction(name = "storageTccAction", commitMethod = "confirm", rollbackMethod = "cancel")
boolean prepareDecrease(
BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "commodityCode") String commodityCode,
@BusinessActionContextParameter(paramName = "count") Integer count
);
}

实现类(性能松绑的关键),在一阶段 prepareDecrease 中,利用业务账目隔离代替物理死等:

1
2
// SQL 逻辑:update storage set usable_count = usable_count - ?, freeze_count = freeze_count + ? where commodity_code = ? and usable_count >= ?
int affectedRows = storageTccMapper.freezeStorage(commodityCode, count);

执行完这句 SQL 并在 prepareDecrease 返回后,当前线程在数据库里的物理事务已经 Commit 并释放锁了! 如果此时有第二个线程来抢购同一件商品,它不需要等待第一个事务的二阶段决议,直接就可以进去执行第二句 freezeStorage。多个事务并发修改同一行的不同部分(扣减可用、增加冻结),数据库的行锁停留时间从“秒级”缩短到了“毫秒级”!


TCC + 异步缓冲

如果混合部署后,数据库行更新(Row Update)的 TPS 依然达到了物理硬件上限(如单机 MySQL 每秒几千次硬编码更新上限),我们可以基于 TCC 玩出更激进的“热点扣减缓冲”:

  • 一阶段 Try 走 Redis:在 prepareDecrease 里,不直接扣减 MySQL,而是利用 Lua 脚本扣减 Redis 里的热点库存,并在 Redis 里记录冻结痕迹。
  • 异步批量落盘:本地开启一个定时任务,或者通过消息队列(MQ),把 Redis 里的扣减结果批量合并(例如:把 100 次扣减 1 整合为 1 次扣减 100),然后批量更新 MySQL。
  • 二阶段 Confirm/Cancel
    • 如果全局成功,Confirm 只负责清除 Redis 里的冻结标记,并标记 MySQL 异步对账完成。
    • 如果全局失败,Cancel 负责把 Redis 里的可用库存吐回去,保证绝对不资损。


收益与代价

在大促复盘时,这种混合部署的架构能够带来质的飞跃:

  • 吞吐量暴涨:库存模块由于物理锁被“降级”为逻辑状态锁,热点行冲突引起的数据库雪崩彻底消失,热点链路吞吐量通常能获得数倍到十倍的提升。
  • 资产安全无忧:订单、优惠券等资产依然保留在 AT 模式的强一致性守护下,不需要程序员去人肉为每个微服务手写复杂的 Cancel 补偿逻辑,研发效率得到了兼顾。

代价就是:

  • 针对 TCC,程序员需要处理极其烧脑的高并发边缘场景。诸如空回滚、幂等、防悬挂、CAS 状态占坑、二阶段参数丢失投毒。稍有不慎,就会在生产环境引爆严重的资金损失或账目对不上。
  • 丧失了全局“读隔离”,引来脏读与超卖体验问题。AT 模式依赖 Seata 的全局锁。如果事务 A 正在修改库存,事务 B 在 A 没最终提交前,是查不到、也改不了这笔库存的,具有极高的隔离性。TCC 追求最终一致性。在一阶段 Try 完结后,可用库存已经减少,冻结库存已经增加。如果此时全局事务在二阶段翻车了,触发 Cancel 倒车释放。用户在前端会看到“库存已抢光”,但过了 2 秒刷新一看,由于别的事务回滚了,“库存居然又离奇地吐出来了”。这种数据在中间态的“软状态”抖动,会带来一定程度上的用户体验魔幻感。
  • 运维与分布式链路追踪的复杂度翻倍。在纯 AT 或纯 TCC 架构中,事务的排查路径是单一的。一旦混合部署,当一个分布式事务挂掉时,你必须在日志中同时去抠 AT 的全局锁、Undo Log 镜像回滚轨迹,以及 TCC 的控制流水表状态机演变线。一旦由于突发断电导致 Seata 协调器(TC)大盘死账积压,你既要清理 undo_log 表,又要手动去人肉核对 TCC 流水表里的 state=1 悬挂数据,运维负担极大。


Saga 模式的说明

Saga 模式的简介

Saga 模式是分布式事务领域里的“长跑健壮型选手”。如果说 TCC 是“强迫症式的精细化资产锁定”,那么 Saga 就是“敢作敢当、错了再改的补偿机制”。它不要求你在第一阶段去“冻结”或者“预留”任何资产,而是直接真刀真枪地把业务走出一步,如果后面有人翻车了,它再倒回去执行反向操作。它和 TCC 的核心区别是:TCC 必须要经历 Try 这一步中间态(冻结资产)。而 Saga 没有中间态,一上来 $T_1$ 就直接把库存扣了,把钱划走了。

Saga 的思想非常朴素:它把一个长分布式事务拆分成一系列的 本地局部事务 $T_1, T_2, \dots, T_n$。 每一个正向事务 $T_i$ 都有一个与之对应的逆向补偿事务 $C_i$。它的执行无非就两种结局:

一种是顺风顺水(一路成功):所有正向动作顺利走完,分布式事务安全结案。

$$T_1 \rightarrow T_2 \rightarrow T_3 \dots \rightarrow T_n$$

另一种是突发翻车(中途失败与回滚):假设走到 $T_3$ 时失败了(比如用户卡里的钱不够了或者库存突然空了),Saga 协调器(在 Seata 里叫状态机)就会启动倒车补偿流,先原地停下,然后按照原路反向依次调用补偿动作$C_2, C_1$。

$$T_1 \rightarrow T_2 \rightarrow T_3(\text{失败}) \rightarrow C_2 \rightarrow C_1$$


Saga 模式的落地

在 Seata 中,实现 Saga 并不需要你像 TCC 那样写一堆注解,它是基于 状态机(State Machine) 来玩的。

Seata 提供了一个专门的引擎,你作为开发者,需要编写一个 JSON 配置文件(这个文件被称为状态机定义)。在 JSON 里,你像画流程图一样定义好(可以在 Seata 前端控制台进行设计):

  • 第一步调用哪个微服务的哪个 Service 方法(正向 $T_1$)。
  • 如果这一步失败了,应该调用哪个 Service 方法去冲正(逆向 $C_1$)。
  • 下一步往哪儿走(路由条件)。

此时你的 Java 业务代码会变得极度纯粹,全都是标准的本地业务方法。具体说明:

正向业务方法(库存微服务):

1
2
3
4
5
public void reduceInventory(String commodityCode, Integer count) {
// 没有任何 TCC 预留语义,直接 UPDATE 扣减可用库存!
int rows = storageMapper.reduceRealStorage(commodityCode, count);
if (rows == 0) throw new RuntimeException("库存不足!");
}

对应的逆向补偿方法(库存微服务):

1
2
3
4
public void compensateInventory(String commodityCode, Integer count) {
// 补偿动作:直接把刚才扣掉的库存加回去
storageMapper.addRealStorage(commodityCode, count);
}


向前还是向后

当分布式事务执行到一半报错时,Seata 的 Saga 状态机引擎支持两种补偿救场策略:


优缺点与生产选型

优势(为什么选它?)

  • 超长链路的救星:TCC 和 AT 模式都需要长久占着数据库连接和锁(从 Try 一直卡到 Confirm),如果微服务链路太长、跨越好几个外部三方系统(比如调用了民航、银联接口),系统会直接卡死。Saga 每个步骤都是本地事务,随用随释放,并发吞吐量极高。
  • 拯救老旧系统(Legacy System):很多遗留系统或外部第三方 API(比如发短信、充话费、出机票),压根不可能为了配合你改造去提供 Try、Confirm 接口。人家就一个 Order 接口(正向)和一个 Refund 退款接口(逆向)。这种情况下,只有 Saga 能带着它们一起玩。


致命痛点(生产必须面对的代价)

由于 Saga 是一上来就直接改了本地数据库(落盘结案),这就会带来可怕的 “脏写 (Dirty Write)” 和 “脏读”。比如,你的机票本来只剩 1 张了。事务 A 进来执行 $T_1$,直接把这张机票买走了(此时事务 A 还在跑后面的步骤,没彻底完结)。 就在这一瞬间,真实的用户去查机票,发现 “卖完了”。 紧接着,事务 A 在 $T_2$ 翻车了,触发 Saga 补偿 $C_1$,又把机票退了回来。 刚刚那个来看机票的用户就会觉得见鬼了——明明前一秒看没票了,后一秒看又有票了。这就是缺乏隔离性带来的体验魔幻。


什么时候掏出 Saga?

  • 能用 AT 模式就用 AT(无侵入,最省心)
  • 对资产、金钱敏感且并发极高的核心链路用 TCC(精细锁定,体验好)
  • 链路长、调用了外部三方系统、或者老旧系统无法改造时,果断上 Saga。